/*
 * Copyright (c) 2022 The red-star Project
 *
 * Licensed under the Apache License, version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.inyourcode.core.monitor;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Counting;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metered;
import com.codahale.metrics.MetricAttribute;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Collections;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static com.codahale.metrics.MetricAttribute.COUNT;
import static com.codahale.metrics.MetricAttribute.M15_RATE;
import static com.codahale.metrics.MetricAttribute.M1_RATE;
import static com.codahale.metrics.MetricAttribute.M5_RATE;
import static com.codahale.metrics.MetricAttribute.MAX;
import static com.codahale.metrics.MetricAttribute.MEAN;
import static com.codahale.metrics.MetricAttribute.MEAN_RATE;
import static com.codahale.metrics.MetricAttribute.MIN;
import static com.codahale.metrics.MetricAttribute.P50;
import static com.codahale.metrics.MetricAttribute.P75;
import static com.codahale.metrics.MetricAttribute.P95;
import static com.codahale.metrics.MetricAttribute.P98;
import static com.codahale.metrics.MetricAttribute.P99;
import static com.codahale.metrics.MetricAttribute.P999;
import static com.codahale.metrics.MetricAttribute.STDDEV;

/**
 * @author JackLei
 */
public class MetricsRedisReport extends ScheduledReporter {

    private final RedisTemplate redisTemplate;
    private final String prefix;
    private final String nodeName;
    private HashOperations hashOperations;
    private String hashKey;

    public MetricsRedisReport(String nodeName,
                               MetricRegistry registry,
                               RedisTemplate redisTemplate,
                               ScheduledExecutorService executor) {
        super(registry,
                "redis-reporter",
                MetricFilter.ALL,
                TimeUnit.SECONDS,
                TimeUnit.MILLISECONDS,
                executor,
                true,
                Collections.emptySet());
        this.redisTemplate = redisTemplate;
        this.nodeName = nodeName;
        this.prefix = "";
        this.hashOperations = redisTemplate.opsForHash();
        this.hashKey = MetricsConstants.NAME_NODE + nodeName;
    }

    @Override
    public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {

        for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
            String logGauge = logGauge(entry.getKey(), entry.getValue());
            hashOperations.put(hashKey, entry.getKey(), logGauge);
        }

        for (Map.Entry<String, Counter> entry : counters.entrySet()) {
            String logCounter = logCounter(entry.getKey(), entry.getValue());
            hashOperations.put(hashKey, entry.getKey(), logCounter);
        }

        for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
            String logHistogram = logHistogram(entry.getKey(), entry.getValue());
            hashOperations.put(hashKey, entry.getKey(), logHistogram);
        }

        for (Map.Entry<String, Meter> entry : meters.entrySet()) {
            String logMeter = logMeter(entry.getKey(), entry.getValue());
            hashOperations.put(hashKey, entry.getKey(), logMeter);
        }

        for (Map.Entry<String, Timer> entry : timers.entrySet()) {
            String logTimer = logTimer(entry.getKey(), entry.getValue());
            hashOperations.put(hashKey, entry.getKey(), logTimer);
        }
    }

    public String logTimer( String name, Timer timer) {
        StringBuilder b = new StringBuilder();
        final Snapshot snapshot = timer.getSnapshot();
        b.setLength(0);
        b.append("type=TIMER");
        append(b, "name", prefix(name));
        appendCountIfEnabled(b, timer);
        appendLongDurationIfEnabled(b, MIN, snapshot::getMin);
        appendLongDurationIfEnabled(b, MAX, snapshot::getMax);
        appendDoubleDurationIfEnabled(b, MEAN, snapshot::getMean);
        appendDoubleDurationIfEnabled(b, STDDEV, snapshot::getStdDev);
        appendDoubleDurationIfEnabled(b, P50, snapshot::getMedian);
        appendDoubleDurationIfEnabled(b, P75, snapshot::get75thPercentile);
        appendDoubleDurationIfEnabled(b, P95, snapshot::get95thPercentile);
        appendDoubleDurationIfEnabled(b, P98, snapshot::get98thPercentile);
        appendDoubleDurationIfEnabled(b, P99, snapshot::get99thPercentile);
        appendDoubleDurationIfEnabled(b, P999, snapshot::get999thPercentile);
        appendMetered(b, timer);
        append(b, "rate_unit", getRateUnit());
        append(b, "duration_unit", getDurationUnit());
        return b.toString();
    }

    public String logMeter(String name, Meter meter) {
        StringBuilder b = new StringBuilder();
        b.setLength(0);
        b.append("type=METER");
        append(b, "name", prefix(name));
        appendCountIfEnabled(b, meter);
        appendMetered(b, meter);
        append(b, "rate_unit", getRateUnit());
        return b.toString();
    }

    public String logHistogram(String name, Histogram histogram) {
        final Snapshot snapshot = histogram.getSnapshot();
        StringBuilder b = new StringBuilder();
        b.setLength(0);
        b.append("type=HISTOGRAM");
        append(b, "name", prefix(name));
        appendCountIfEnabled(b, histogram);
        appendLongIfEnabled(b, MIN, snapshot::getMin);
        appendLongIfEnabled(b, MAX, snapshot::getMax);
        appendDoubleIfEnabled(b, MEAN, snapshot::getMean);
        appendDoubleIfEnabled(b, STDDEV, snapshot::getStdDev);
        appendDoubleIfEnabled(b, P50, snapshot::getMedian);
        appendDoubleIfEnabled(b, P75, snapshot::get75thPercentile);
        appendDoubleIfEnabled(b, P95, snapshot::get95thPercentile);
        appendDoubleIfEnabled(b, P98, snapshot::get98thPercentile);
        appendDoubleIfEnabled(b, P99, snapshot::get99thPercentile);
        appendDoubleIfEnabled(b, P999, snapshot::get999thPercentile);
        return b.toString();
    }

    public String logCounter(String name, Counter counter) {
        StringBuilder b = new StringBuilder();
        b.setLength(0);
        b.append("type=COUNTER");
        append(b, "name", prefix(name));
        append(b, COUNT.getCode(), counter.getCount());
        return b.toString();
    }

    public String logGauge(String name, Gauge<?> gauge) {
        StringBuilder b = new StringBuilder();
        b.setLength(0);
        b.append("type=GAUGE");
        append(b, "name", prefix(name));
        append(b, "value", gauge.getValue());
        return b.toString();
    }

    private void appendLongDurationIfEnabled(StringBuilder b, MetricAttribute metricAttribute,
                                             Supplier<Long> durationSupplier) {
        if (!getDisabledMetricAttributes().contains(metricAttribute)) {
            append(b, metricAttribute.getCode(), convertDuration(durationSupplier.get()));
        }
    }

    private void appendDoubleDurationIfEnabled(StringBuilder b, MetricAttribute metricAttribute,
                                               Supplier<Double> durationSupplier) {
        if (!getDisabledMetricAttributes().contains(metricAttribute)) {
            append(b, metricAttribute.getCode(), convertDuration(durationSupplier.get()));
        }
    }

    private void appendLongIfEnabled(StringBuilder b, MetricAttribute metricAttribute,
                                     Supplier<Long> valueSupplier) {
        if (!getDisabledMetricAttributes().contains(metricAttribute)) {
            append(b, metricAttribute.getCode(), valueSupplier.get());
        }
    }

    private void appendDoubleIfEnabled(StringBuilder b, MetricAttribute metricAttribute,
                                       Supplier<Double> valueSupplier) {
        if (!getDisabledMetricAttributes().contains(metricAttribute)) {
            append(b, metricAttribute.getCode(), valueSupplier.get());
        }
    }

    private void appendCountIfEnabled(StringBuilder b, Counting counting) {
        if (!getDisabledMetricAttributes().contains(COUNT)) {
            append(b, COUNT.getCode(), counting.getCount());
        }
    }

    private void appendMetered(StringBuilder b, Metered meter) {
        appendRateIfEnabled(b, M1_RATE, meter::getOneMinuteRate);
        appendRateIfEnabled(b, M5_RATE, meter::getFiveMinuteRate);
        appendRateIfEnabled(b, M15_RATE,  meter::getFifteenMinuteRate);
        appendRateIfEnabled(b, MEAN_RATE,  meter::getMeanRate);
    }

    private void appendRateIfEnabled(StringBuilder b, MetricAttribute metricAttribute, Supplier<Double> rateSupplier) {
        if (!getDisabledMetricAttributes().contains(metricAttribute)) {
            append(b, metricAttribute.getCode(), convertRate(rateSupplier.get()));
        }
    }

    private void append(StringBuilder b, String key, long value) {
        b.append(", ").append(key).append('=').append(value);
    }

    private void append(StringBuilder b, String key, double value) {
        b.append(", ").append(key).append('=').append(value);
    }

    private void append(StringBuilder b, String key, String value) {
        b.append(", ").append(key).append('=').append(value);
    }

    private void append(StringBuilder b, String key, Object value) {
        b.append(", ").append(key).append('=').append(value);
    }

    @Override
    protected String getRateUnit() {
        return "events/" + super.getRateUnit();
    }

    private String prefix(String... components) {
        return MetricRegistry.name(prefix, components);
    }
}
